package com.it.cloud.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;


/**
 * 生产者--带回调
 */
public class BasicCallbackProducer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // key序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // value序列化
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // ------ 可选配置 -------
        // 确认模式
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        // 重试次数，0为不启用重试机制
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        // 控制批处理大小，单位为字节
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 批量发送，延迟为1毫秒，启用该功能能有效减少生产者发送消息次数，从而提高并发量
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

        Producer<String, String> producer = new KafkaProducer<>(props);

        // ProducerRecord消息封装类
        ProducerRecord<String, String> record;
        for (int i = 0; i < 10; i++) {
            record = new ProducerRecord<String, String>("my-topic", null,
                    System.currentTimeMillis(), Integer.toString(i), Integer.toString(i));

            // 发送消息时指定一个callback, 并覆盖onCompletion()方法，在成功发送后获取消息的偏移量及分区
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metaData, Exception exception) {
                    // 记录异常信息
                    if (null != exception) {
                        System.out.println("Send message exception." + exception);
                    }
                    if (null != metaData) {
                        System.out.println(String.format("offset:%s,partition:%s",
                                metaData.offset(), metaData.partition()));
                    }
                }
            });
        }

        producer.close();
    }
}
